//  Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include "wal_handler.h"

#include <algorithm>
#include <cinttypes>
#include <ctime>
#include <memory>
#include <vector>

#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/transaction_log_impl.h"
#include "db/write_batch_internal.h"
#include "options/options_helper.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/write_batch.h"
#include "util/file_reader_writer.h"
#include "util/filename.h"
#include "util/logging.h"
#include "util/mutexlock.h"

namespace rocksdb {

int64_t WALHandler::envGetNowTime() {
  int64_t now = 0;
  Status s = env_->GetCurrentTime(&now);
  return now;
}

int64_t WALHandler::envGetHour() {
  int64_t now = 0;
  Status s = env_->GetCurrentTime(&now);
  int64_t ret = now / 3600;
  return ret;
}

WALHandler::WALHandler(const DBOptions& db_options, VersionSet* versions)
    : env_(db_options.env),
      initial_db_options_(db_options),
      db_options_(initial_db_options_),
      mutable_db_options_(initial_db_options_),
      write_thread_(db_options_),
      env_options_(BuildDBOptions(db_options_, mutable_db_options_)),
      wal_dir_(db_options_.wal_dir),
      versions_(versions),
      stats_(initial_db_options_.statistics.get()),
      logfile_number_(0),
      wait_commit_batch_num_(0),
      last_commit_batch_completed_(true),
      total_log_size_(0),
      last_sequence_(1),
      last_create_wal_per_hour_(0) {
  ROCKS_LOG_INFO(initial_db_options_.info_log, "wal handler in zn-kvs mode create. wal_dir=%s",
                 wal_dir_.data());
}

Status WALHandler::Create() {
  Status s;
  last_create_wal_per_hour_ = envGetHour();
  if (!is_recovered_) {
    if ((logfile_number_ / 100 * 100) == envGetNowTime()) {
      logfile_number_prefix_ = envGetNowTime();
      logfile_number_suffix_ = logfile_number_ % 100 + 1;
      logfile_number_ = logfile_number_prefix_ * 100 + logfile_number_suffix_;
    } else {
      logfile_number_prefix_ = envGetNowTime();
      logfile_number_suffix_ = 0;
      logfile_number_ = logfile_number_prefix_ * 100 + logfile_number_suffix_;
    }
    is_recovered_ = true;
  } else if (logfile_number_prefix_ != envGetNowTime()) {
    logfile_number_prefix_ = envGetNowTime();
    logfile_number_suffix_ = 0;
    logfile_number_ = logfile_number_prefix_ * 100 + logfile_number_suffix_;
  } else {
    logfile_number_suffix_++;
    logfile_number_ = logfile_number_prefix_ * 100 + logfile_number_suffix_;
  }

  std::string log_fname = LogFileName(wal_dir_, logfile_number_);

  std::unique_ptr<WritableFile> lfile;
  DBOptions db_options = BuildDBOptions(db_options_, mutable_db_options_);
  EnvOptions opt_env_options =
      env_->OptimizeForLogWrite(env_options_, db_options);
  s = env_->NewWritableFile(log_fname, &lfile, opt_env_options);
  if (!s.ok()) {
    ROCKS_LOG_ERROR(initial_db_options_.info_log,
                    "New Writable File Happen ERROR!!! Now File Name =%s, File Number=%li",
                    log_fname.data(), logfile_number_);
    return s;
  }
  std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
      std::move(lfile), log_fname, opt_env_options, env_));
  auto new_log =
      new log::Writer(std::move(file_writer), logfile_number_, false, false);
  if (log_writer_ != nullptr) {
    Close();
  }
  log_writer_ = new_log;
  logs_.emplace_back(logfile_number_, 0);
  return s;
}

Status WALHandler::rename(const std::string& src, const std::string& target) {
  Status s;
  s = env_->RenameFile(src, target);
  return s;
}

Status WALHandler::LoadWAL(InternalStats* internal_stats) {
  Status s = env_->CreateDirIfMissing(wal_dir_);
  default_cf_internal_stats_ = internal_stats;

  s = acquireWALFiles();
  uint64_t last_log_number = getLastLogfileNumber();
  // If there is a log in dir, reload the previous log file
  if (last_log_number) {
    is_recovered_ = false;
    logfile_number_ = last_log_number;
    total_log_size_ = getTotalLogfileSize();
  }
  return s;
}

Status WALHandler::DeleteFile(SequenceNumber number) {
  Status s;
  for (const auto log : logs_) {
    if (log.number < number && log.number != log_writer_->get_log_number()) {
      s = deleteWAL(log.number);
      if (s.ok()) {
        total_log_size_ -= logs_.front().size;
        logs_.pop_front();
      } else {
        return s;
      }
    }
  }
  return s;
}

Status WALHandler::DeleteFile(tm* time) {
  Status s;
  uint64_t ret = (uint64_t)time->tm_year * 10000000000 +
                 (uint64_t)time->tm_yday * 8640000 +
                 (uint64_t)time->tm_hour * 360000 +
                 (uint64_t)time->tm_min * 6000 + (uint64_t)time->tm_sec * 100;
  s = DeleteFile(ret);
  return s;
}

Status WALHandler::deleteWAL(uint64_t number) {
  Status s;
  std::string log_fname = LogFileName(wal_dir_, number);
  s = env_->DeleteFile(log_fname);
  return s;
}

uint64_t WALHandler::GetLogfileNumber() const { return logs_.size(); }
// Get the size of the entire WAL file
uint64_t WALHandler::getTotalLogfileSize() const {
  uint64_t ret = 0;
  for (const auto& log : logs_) {
    uint64_t number = log.size;
    ret += number;
  }
  return ret;
}
// Gets the file number of the last WAL created in the directory. If the return
// value is 0, there is no WAL in dir
uint64_t WALHandler::getLastLogfileNumber() const {
  if (logs_.empty()) {
    return 0;
  }
  return logs_.back().number;
}

Status WALHandler::preprocessWrite() {
  PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time)
  Status s;
  int64_t now = envGetHour();

  if (now != last_create_wal_per_hour_ || logs_.back().size > max_wal_size_) {
    if (logfile_number_prefix_ != envGetNowTime()) {
      logfile_number_prefix_ = envGetNowTime();
      logfile_number_suffix_ = 0;
      logfile_number_ = logfile_number_prefix_ * 100 + logfile_number_suffix_;
    } else {
      logfile_number_suffix_++;
      logfile_number_ = logfile_number_prefix_ * 100 + logfile_number_suffix_;
    }
    if(!logs_.empty()){
      std::string log_fname = LogFileName(wal_dir_, logfile_number_);
      std::string last_fname = LogFileName(wal_dir_, logs_.back().number);
      s = rename(last_fname, log_fname);

      if (!s.ok()) {
        ROCKS_LOG_ERROR(initial_db_options_.info_log,
                        "Rename WAL File Happen ERROR!!! "
                        "Now Source File Name =%s, Source File Number=%li, Target File Name=%s, Source File Number=%li",
                        last_fname.data(), logs_.back().number, log_fname.data(), logfile_number_);
        return s;
      }

      logs_.back().ResetNumber(logfile_number_);
    }
    s = Create();
  }
  PERF_TIMER_STOP(write_scheduling_flushes_compactions_time)
  return s;
}

Status WALHandler::Write(const WriteOptions& write_options,
                         WriteBatch* my_batch, DB* db, FlushScheduler* flush_scheduler) {
  PERF_TIMER_GUARD(write_pre_and_post_process_time)
  RecordTick(stats_, WRITE_WITH_WAL);
  Status status;
  WriteThread::Writer w(write_options, my_batch, nullptr, 0,
                        false /* disable_memtable */);

  StopWatch write_sw(env_, stats_, DB_WRITE);
  // join the queue
  write_thread_.JoinBatchGroup(&w);
  if(initial_db_options_.allow_concurrent_memtable_write){
    if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {

      PERF_TIMER_STOP(write_pre_and_post_process_time);
      PERF_TIMER_GUARD(write_memtable_time);

      ColumnFamilyMemTablesImpl column_family_memtables(
          versions_->GetColumnFamilySet());

      if(db){
        w.status = WriteBatchInternal::InsertInto(
            &w, w.sequence, &column_family_memtables, flush_scheduler,
            write_options.ignore_missing_column_families, 0 /*log_number*/, db,
            true /*concurrent_memtable_writes*/, false, w.batch_cnt);
      }

      PERF_TIMER_START(write_pre_and_post_process_time);
      if (write_thread_.CompleteParallelMemTableWriter(&w)) {
        // we're responsible for exit batch group
        auto last_sequence = w.write_group->last_sequence;
        versions_->SetLastSequence(last_sequence);
        write_thread_.ExitAsBatchGroupFollower(&w);
      }
      assert(w.state == WriteThread::STATE_COMPLETED);
      // STATE_COMPLETED conditional below handles exit
      status = w.FinalStatus();
    }
    if (w.state == WriteThread::STATE_COMPLETED) {
      return w.FinalStatus();
    }
  }else{
    if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER
        || w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER
        || w.state == WriteThread::STATE_COMPLETED) {
      return w.FinalStatus();
    }
  }

  assert(w.state == WriteThread::STATE_GROUP_LEADER);

  WriteThread::WriteGroup write_group;
  write_thread_.EnterAsBatchGroupLeader(&w, &write_group);

  uint64_t total_batch_cnt = 0;
  size_t total_byte_size = 0;
  const SequenceNumber current_sequence = write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
  if (w.status.ok()) {
    SequenceNumber next_sequence = current_sequence;
    for (auto writer : write_group) {
      writer->sequence = next_sequence;
      size_t count = WriteBatchInternal::Count(writer->batch);
      next_sequence += count;
      total_batch_cnt += count;
      total_byte_size = WriteBatchInternal::AppendedByteSize(
          total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
    }
    write_thread_.UpdateLastSequence(current_sequence + total_batch_cnt - 1);
  }

  SequenceNumber last_sequence =  versions_->FetchAddLastAllocatedSequence(total_batch_cnt);
  w.write_group->last_sequence = last_sequence + total_batch_cnt;

  WriteBatch tmp_batch;
  size_t write_with_wal = 0;
  WriteBatch* to_be_cached_state = nullptr;
  WriteBatch* merged_batch;
  // Same holds for all in the batch group
  merged_batch = mergeBatch(write_group, &tmp_batch, &write_with_wal,
                            &to_be_cached_state, last_sequence);

  if (merged_batch == write_group.leader->batch) {
    write_group.leader->log_used = logfile_number_;
  } else if (write_with_wal > 1) {
    for (auto writer : write_group) {
      writer->log_used = logfile_number_;
    }
  }

  WriteBatchInternal::SetSequence(merged_batch, current_sequence);
  PERF_TIMER_STOP(write_pre_and_post_process_time)
  status = preprocessWrite();
  PERF_TIMER_START(write_pre_and_post_process_time)
  log::Writer* log_writer = log_writer_;
  uint64_t log_size;
  bool parallel = initial_db_options_.allow_concurrent_memtable_write &&
      write_group.size > 1;
  auto stats = default_cf_internal_stats_;
  if(stats){
    stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_batch_cnt, true);
    RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_batch_cnt);
    stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size, true);
    RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
    stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, true);
    RecordTick(stats_, WRITE_DONE_BY_SELF);
    auto write_done_by_other = write_group.size - 1;
    if (write_done_by_other > 0) {
      stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
                        true);
      RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
    }
    RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
  }

  PERF_TIMER_STOP(write_pre_and_post_process_time)

  if (!write_options.disableWAL) {
    PERF_TIMER_GUARD(write_wal_time)
    status = writeToWAL(*merged_batch, log_writer, nullptr, &log_size);
    if (!status.ok()) {
      for (auto writer : write_group) {
        writer->status = status;
      }
      write_thread_.ExitAsBatchGroupLeader(write_group, status);
      return status;
    }

    if(write_options.sync) {
      status = sync();
      if (!status.ok()) {
        ROCKS_LOG_ERROR(initial_db_options_.info_log,
                        "WAL sync record Happen ERROR!!! ");
        write_thread_.ExitAsBatchGroupLeader(write_group, status);
        return Status::IOError("WAL sync record ERROR");
      }
    }
  }

  ColumnFamilyMemTablesImpl column_family_memtables(versions_->GetColumnFamilySet());


  if(initial_db_options_.allow_concurrent_memtable_write){
    if(parallel){
      write_group.last_sequence = last_sequence + total_batch_cnt;
      write_thread_.LaunchParallelMemTableWriters(&write_group);
      assert(w.sequence == current_sequence);
      if(db){
        status = WriteBatchInternal::InsertInto(
            &w, w.sequence, &column_family_memtables, flush_scheduler,
            write_options.ignore_missing_column_families, 0 /*log_number*/,
            db, true /*concurrent_memtable_writes*/, false,
            w.batch_cnt);
      }
    }else{
      if(db) {
        status = WriteBatchInternal::InsertInto(
            write_group, current_sequence, &column_family_memtables,
            flush_scheduler, write_options.ignore_missing_column_families,
            0 /*recovery_log_number*/, db, parallel);
      }
    }
  }else{
    if(db){
      status = WriteBatchInternal::InsertInto(
          merged_batch, current_sequence, &column_family_memtables, flush_scheduler,
          true, 0 /*log_number*/, db,  false /*concurrent_memtable_writes*/);
    }
  }

  PERF_TIMER_START(write_pre_and_post_process_time)

  if(!status.ok()){
    ROCKS_LOG_ERROR(initial_db_options_.info_log,
                    "Batch InsertInto Memtable Happen ERROR!!! "
                    " current sequence=%zu, Merged Batch Size=%li, Merged Batch Count=%i, Merged Batch Data=%s",
                    current_sequence, merged_batch->GetDataSize(), merged_batch->Count(), merged_batch->Data().data());
  }

  if (status.ok()) {
    if(stats){
      const bool concurrent = true;
      stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size, concurrent);
      RecordTick(stats_, WAL_FILE_BYTES, log_size);
      stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal,
                        concurrent);
      RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
    }
  }

  last_commit_batch_completed_ = false;
  wait_commit_batch_num_.store(write_group.size, std::memory_order_relaxed);
  last_sequence_.store(last_sequence + total_batch_cnt, std::memory_order_relaxed);

  bool should_exit_batch_group = true;
  if(parallel){
    should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
  }

  if(should_exit_batch_group) {
    versions_->SetLastSequence(last_sequence_.load(std::memory_order_relaxed));
    write_thread_.ExitAsBatchGroupLeader(write_group, status);
  }
    return status;
}

Status WALHandler::PipelineWrite(const WriteOptions& write_options,
                         WriteBatch* my_batch, DB* db, FlushScheduler* flush_scheduler) {
  PERF_TIMER_GUARD(write_pre_and_post_process_time);
  StopWatch write_sw(env_, stats_, DB_WRITE);

  RecordTick(stats_, WRITE_WITH_WAL);
  Status status;

  WriteThread::Writer w(write_options, my_batch, nullptr, 0,
                        false /* disable_memtable */);

  write_thread_.JoinBatchGroup(&w);
  if (w.state == WriteThread::STATE_GROUP_LEADER) {
    WriteThread::WriteGroup wal_write_group;
    if (w.callback && !w.callback->AllowWriteBatching()) {
      write_thread_.WaitForMemTableWriters();
    }

    // wal预写处理
    PERF_TIMER_STOP(write_pre_and_post_process_time)
    status = preprocessWrite();
    PERF_TIMER_START(write_pre_and_post_process_time)


    write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
    const SequenceNumber current_sequence =
        write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
    size_t total_count = 0;
    size_t total_byte_size = 0;

    if (w.status.ok()) {
      SequenceNumber next_sequence = current_sequence;
      for (auto writer : wal_write_group) {
        if (writer->CheckCallback(db)) {
          if (writer->ShouldWriteToMemtable()) {
            writer->sequence = next_sequence;
            size_t count = WriteBatchInternal::Count(writer->batch);
            next_sequence += count;
            total_count += count;
          }
          total_byte_size = WriteBatchInternal::AppendedByteSize(
              total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
        }
      }

      write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
    }

    auto stats = default_cf_internal_stats_;
    stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count);
    RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
    stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
    RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
    RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);

    PERF_TIMER_STOP(write_pre_and_post_process_time);

    log::Writer* log_writer = log_writer_;

    if (w.status.ok() && !write_options.disableWAL) {
      PERF_TIMER_GUARD(write_wal_time);
      stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
      RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
      if (wal_write_group.size > 1) {
        stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
                          wal_write_group.size - 1);
        RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
      }

      WriteBatch tmp_batch;
      size_t write_with_wal = 0;
      WriteBatch* to_be_cached_state = nullptr;
      WriteBatch* merged_batch;
      // Same holds for all in the batch group
      merged_batch = mergeBatch(wal_write_group, &tmp_batch, &write_with_wal,
                                &to_be_cached_state, current_sequence + total_count - 1);
      status = writeToWAL(*merged_batch, log_writer, nullptr, &total_byte_size);
    }

    write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
  }

  WriteThread::WriteGroup memtable_write_group;
  if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
    PERF_TIMER_GUARD(write_memtable_time);
    assert(w.ShouldWriteToMemtable());
    write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
    ColumnFamilyMemTablesImpl column_family_memtables(versions_->GetColumnFamilySet());
    if (memtable_write_group.size > 1 &&
        initial_db_options_.allow_concurrent_memtable_write) {
      write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
    } else {
      memtable_write_group.status = WriteBatchInternal::InsertInto(
          &w, w.sequence, &column_family_memtables, flush_scheduler,
          write_options.ignore_missing_column_families, 0 /*log_number*/,
          db, true /*concurrent_memtable_writes*/, false,
          w.batch_cnt);
      versions_->SetLastSequence(memtable_write_group.last_sequence);
      write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
    }
  }

  if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
    assert(w.ShouldWriteToMemtable());
    ColumnFamilyMemTablesImpl column_family_memtables(
        versions_->GetColumnFamilySet());
    w.status = WriteBatchInternal::InsertInto(
        &w, w.sequence, &column_family_memtables, flush_scheduler,
        write_options.ignore_missing_column_families, 0 /*log_number*/, db,
        true /*concurrent_memtable_writes*/);
    if (write_thread_.CompleteParallelMemTableWriter(&w)) {
      versions_->SetLastSequence(w.write_group->last_sequence);
      write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
    }
  }

  assert(w.state == WriteThread::STATE_COMPLETED);
  return w.FinalStatus();

}
// Deferred processing after each batch is committed in writeGroup.Wake up the
// leader of the next writegroup writer thread.
void WALHandler::PostProcess() {
  std::unique_lock<std::mutex> locker(mutex_);
  wait_commit_batch_num_--;
  if (wait_commit_batch_num_ == 0) {
    last_commit_batch_completed_ = true;
    versions_->SetLastSequence(last_sequence_.load(std::memory_order_relaxed));
    condvar_.notify_all();
  }
}

WriteBatch* WALHandler::mergeBatch(const WriteThread::WriteGroup& write_group,
                                   WriteBatch* tmp_batch,
                                   size_t* write_with_wal,
                                   WriteBatch** to_be_cached_state,
                                   SequenceNumber seq) {
  WriteBatch* merged_batch;
  *write_with_wal = 0;
  auto* leader = write_group.leader;

  if (write_group.size == 1 && !leader->CallbackFailed() &&
      leader->batch->GetWalTerminationPoint().is_cleared()) {
    // we simply write the first WriteBatch to WAL if the group only
    // contains one batch, that batch should be written to the WAL,
    // and the batch is not wanting to be truncated
    merged_batch = leader->batch;
    if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
      *to_be_cached_state = merged_batch;
    }
    *write_with_wal = 1;
  } else {
    // WAL needs all of the batches flattened into a single batch.
    // We could avoid copying here with an iov-like AddRecord
    // interface
    merged_batch = tmp_batch;

    for (auto writer : write_group) {
      if (!writer->CallbackFailed()) {
        WriteBatchInternal::Append(merged_batch, writer->batch,
            /*WAL_only*/ true);
        if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
          // We only need to cache the last of such write batch
          *to_be_cached_state = writer->batch;
        }
        (*write_with_wal)++;
      }
    }
  }
  return merged_batch;
}

Status WALHandler::writeToWAL(const WriteBatch& merged_batch,
                              log::Writer* log_writer, uint64_t* log_used,
                              uint64_t* log_size) {
  Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
  *log_size = log_entry.size();

  Status status = log_writer->AddRecord(log_entry);
  if (!status.ok()) {
    ROCKS_LOG_ERROR(initial_db_options_.info_log,
                    "Batch Write to WAL Happen ERROR!!! "
                    "Merged Batch Size=%li, Merged Batch Count=%i, Merged Batch Data=%s",
                    merged_batch.GetDataSize(), merged_batch.Count(), merged_batch.Data().data());
    return Status::IOError("WAL add record ERROR");
  }

  total_log_size_ += log_entry.size();
  logs_.back().AddSize(log_entry.size());
  log_empty_ = false;

  return status;
}

void WALHandler::Close() {
  if(log_writer_) log_writer_->Close();
}
// Recover all WAL files to memory
Status WALHandler::RecoverLogFilesToTables(ColumnFamilyMemTables* memtables,
                                           SequenceNumber& last_sequence) {
  Status s;

  bool stop_replay_for_corruption = false;
  // Recover to memory from the specified WAL file
  for (auto& f : logs_) {
    SequenceNumber next_sequence(kMaxSequenceNumber);

    uint64_t number = f.number;
    std::string log_fname = LogFileName(wal_dir_, number);

    std::unique_ptr<SequentialFile> file;
    s = env_->NewSequentialFile(log_fname, &file,
                                env_->OptimizeForLogRead(env_options_));
    if (!s.ok()) {
      return s;
    }
    std::unique_ptr<SequentialFileReader> file_reader(
        new SequentialFileReader(std::move(file), log_fname));
    LogReporter reporter;
    reporter.env = env_;
    reporter.info_log = db_options_.info_log.get();
    reporter.fname = log_fname.c_str();
    reporter.ignore_error = !db_options_.paranoid_checks;
    // In this mode, any IO errors during log reading are ignored.The system
    // tries to recover as much data as possible.Suitable for disaster recovery.
    if (!db_options_.paranoid_checks ||
        db_options_.wal_recovery_mode ==
        WALRecoveryMode::kSkipAnyCorruptedRecords) {
      reporter.status = nullptr;
    } else {
      reporter.status = &s;
    }

    auto reader = new log::Reader(db_options_.info_log, std::move(file_reader),
                                  &reporter, true /*checksum*/, number);
    std::string scratch;
    Slice record;
    WriteBatch batch;

    // Read item by item from the log file
    while (
        reader->ReadRecord(&record, &scratch, db_options_.wal_recovery_mode)) {
      if (record.size() < WriteBatchInternal::kHeader) {
        reporter.Corruption(record.size(),
                            Status::Corruption("log record too small"));
        continue;
      }
      WriteBatchInternal::SetContents(&batch, record);
      SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
      SequenceNumber batch_count = WriteBatchInternal::Count(&batch);
      // In this mode, Wal replay stops when an IO error is encountered.The
      // system reverts to a specific point in time where consistent performance
      // is guaranteed.This works well for systems with replication.Data from
      // other replica sets can be used to replay data from after a "certain
      // point in time" to recover the system.
      if (db_options_.wal_recovery_mode ==
          WALRecoveryMode::kPointInTimeRecovery) {
        // In point-in-time recovery mode, if sequence id of log files are
        // consecutive, we continue recovery despite corruption. This could
        // happen when we open and write to a corrupted DB, where sequence id
        // will start from the last sequence id we recovered.
        if (sequence == next_sequence) {
          stop_replay_for_corruption = false;
        }
        if (stop_replay_for_corruption) {
          uint64_t bytes;
          if (env_->GetFileSize(log_fname, &bytes).ok()) {
            ROCKS_LOG_WARN(initial_db_options_.info_log, "%s: dropping %d bytes", log_fname.c_str(),
                           static_cast<int>(bytes));
          }
          break;
        }
      }
      // Insert into the memory block
      s = WriteBatchInternal::InsertInto(&batch, memtables, nullptr, true,
                                         number, nullptr, false, nullptr,
                                         nullptr);
      if (s.ok()) {
        last_sequence = sequence + batch_count - 1;
      }
      if (!s.ok()) {
        // We are treating this as a failure while reading since we read valid
        // blocks that do not form coherent data
        reporter.Corruption(record.size(), s);
        continue;
      }
    }
    // After exiting ReadRecord, determine the return value of the error
    // according to the recovery mode
    if (!s.ok()) {
      if (s.IsNotSupported()) {
        // We should not treat NotSupported as corruption. It is rather a clear
        // sign that we are processing a WAL that is produced by an incompatible
        // version of the code.
        return s;
      }
      if (db_options_.wal_recovery_mode ==
          WALRecoveryMode::kSkipAnyCorruptedRecords) {
        // We should ignore all errors unconditionally
        s = Status::OK();
      } else if (db_options_.wal_recovery_mode ==
                 WALRecoveryMode::kPointInTimeRecovery) {
        // We should ignore the error but not continue replaying
        s = Status::OK();
        stop_replay_for_corruption = true;
        //        corrupted_log_number = number;
        ROCKS_LOG_INFO(initial_db_options_.info_log,
                       "Point in time recovered to log #%" PRIu64
                           " seq #%" PRIu64,
                       number, next_sequence);
      } else {
        assert(db_options_.wal_recovery_mode ==
               WALRecoveryMode::kTolerateCorruptedTailRecords ||
               db_options_.wal_recovery_mode ==
               WALRecoveryMode::kAbsoluteConsistency);
        return s;
      }
    }
  }
  last_sequence_ = last_sequence;
  return s;
}

bool WALHandler::readRecord(Slice* record, std::string* scratch) {
  Status s;
  // On the first read, initialize
  if (readers_.empty()) {
    reader_index_ = 0;
    for (const auto& f : logs_) {
      uint64_t number;
      number = logs_.back().number;
      std::string log_fname = LogFileName(wal_dir_, number);
      std::unique_ptr<SequentialFile> file;
      s = env_->NewSequentialFile(log_fname, &file,
                                  env_->OptimizeForLogRead(env_options_));
      std::unique_ptr<SequentialFileReader> file_reader(
          new SequentialFileReader(std::move(file), log_fname));

      if (!s.ok()) {
        return false;
      }

      LogReporter reporter;
      reporter.env = env_;
      reporter.info_log = db_options_.info_log.get();
      reporter.fname = log_fname.c_str();
      reporter.status = &s;
      reporter.ignore_error = !db_options_.paranoid_checks;

      auto reader =
          new log::Reader(db_options_.info_log, std::move(file_reader),
                          &reporter, true /*checksum*/, number);
      readers_.push_back(reader);
    }
  }

  // Read out a batch
  if (readers_[reader_index_]->ReadRecord(record, scratch)) {
    if (record->size() < WriteBatchInternal::kHeader) {
      readers_[reader_index_]->GetReporter()->Corruption(
          record->size(), Status::Corruption("log record too small"));
      // TODO read record's till the first no corrupt entry?
    } else {
      return true;
    }
  }
    // A WAL file reading situation
  else {
    reader_index_++;
    // Read all the WAL files.
    if (reader_index_ >= readers_.size()) {
      readers_.clear();
      reader_index_ = 0;
      return false;
    }
  }
  // The WAL file is not fully read.When the next file is found, proceed to read
  // a file
  if (readers_[reader_index_]->ReadRecord(record, scratch)) {
    if (record->size() < WriteBatchInternal::kHeader) {
      readers_[reader_index_]->GetReporter()->Corruption(
          record->size(), Status::Corruption("log record too small"));
      // TODO read record's till the first no corrupt entry?
    } else {
      return true;
    }
  }
  return true;
}

Status WALHandler::getSortedWALFiles(VectorLogPtr& log_files) {
  Status s;
  std::vector<std::string> all_files;
  s = env_->GetChildren(wal_dir_, &all_files);

  if (!s.ok()) {
    return s;
  }

  for (const auto& f : all_files) {
    uint64_t number = 0;
    FileType type;
    if (ParseFileName(f, &number, &type) && type == kLogFile) {
      uint64_t size_bytes;
      env_->GetFileSize(LogFileName(wal_dir_, number), &size_bytes);
      log_files.push_back(std::unique_ptr<LogFile>(
          new LogFileImpl(number, kAliveLogFile, 1, size_bytes)));
    }
  }
  CompareLogByPointer compare_log_files;
  std::sort(log_files.begin(), log_files.end(), compare_log_files);
  return s;
}

Status WALHandler::acquireWALFiles() {
  Status status;
  VectorLogPtr log_files;
  status = getSortedWALFiles(log_files);
  if (!status.ok()) {
    return status;
  }
  logs_.clear();
  for (const auto& fd : log_files) {
    uint64_t number = fd->LogNumber();
    uint64_t size = fd->SizeFileBytes();
    logs_.emplace_back(number, size);
  }
  return status;
}
// Traverse all records of the wal file, return the sequence of the last record,
// return all records in string format, if there is no wal file, sequence returns
// 0
Status WALHandler::TEST_Iterate(int& data_sum,
                                std::vector<std::string>& str) {
  data_sum = 0;
  Status s;

  for (const auto& fd : logs_) {
    uint64_t number;
    number = fd.number;
    std::string log_fname = LogFileName(wal_dir_, number);
    s = iterateRecord(log_fname, number, data_sum, str);
  }
  return s;
}

Status WALHandler::iterateRecord(const std::string& fname,
                                 const uint64_t number,
                                 int& data_sum,
                                 std::vector<std::string>& str) {
  struct LogReporter : public log::Reader::Reporter {
    Env* env;
    Logger* info_log;
    const char* fname;

    Status* status;
    bool ignore_error;  // true if db_options_.paranoid_checks==false
    void Corruption(size_t bytes, const Status& s) override {
      ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
                     (this->ignore_error ? "(ignoring error) " : ""), fname,
                     static_cast<int>(bytes), s.ToString().c_str());
      if (this->status->ok()) {
        // only keep the first error
        *this->status = s;
      }
    }
  };

  std::unique_ptr<SequentialFile> file;
  Status status = env_->NewSequentialFile(
      fname, &file, env_->OptimizeForLogRead(env_options_));
  std::unique_ptr<SequentialFileReader> file_reader(
      new SequentialFileReader(std::move(file), fname));

  if (!status.ok()) {
    return status;
  }

  LogReporter reporter;
  reporter.env = env_;
  reporter.info_log = db_options_.info_log.get();
  reporter.fname = fname.c_str();
  reporter.status = &status;
  reporter.ignore_error = !db_options_.paranoid_checks;
  log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
                     true /*checksum*/, number);
  std::string scratch;
  Slice record;

  while (reader.ReadRecord(&record, &scratch)) {
    if (record.size() < WriteBatchInternal::kHeader) {
      reporter.Corruption(record.size(),
                          Status::Corruption("log record too small"));
      // TODO read record's till the first no corrupt entry?
    } else {
      WriteBatch batch;
      WriteBatchInternal::SetContents(&batch, record);
      str.push_back(batch.Data());
      data_sum += WriteBatchInternal::Count(&batch);
    }
  }
  // ReadRecord returns false on EOF, which means that the log file is empty. we
  // return status.ok() in that case and set sequence number to 0
  return status;
}

Status WALHandler::sync() {
  Status status;

  status = log_writer_->file()->Sync(true);

  return status;
}
}  // namespace rocksdb